/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.security.Permission;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import java.util.zip.GZIPOutputStream;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.shell.Count;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.BlockInlineChecksumReader;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DataTransferThrottler;
import org.apache.hadoop.util.InjectionEventCore;
import org.apache.hadoop.util.InjectionEventI;
import org.apache.hadoop.util.InjectionHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;

/**
 * This class tests commands from DFSShell.
 */
public class TestDFSShell extends TestCase {
  private MiniDFSCluster cluster;
  static final String TEST_ROOT_DIR =
    new Path(System.getProperty("test.build.data","/tmp"))
    .toString().replace(' ', '+');

  static String getUserName(FileSystem fs) {
    if (fs instanceof DistributedFileSystem) {
      return ((DistributedFileSystem)fs).dfs.ugi.getUserName();
    }
    return System.getProperty("user.name");
  }
  static Path writeFile(FileSystem fs, Path f) throws IOException {
    DataOutputStream out = fs.create(f);
    out.writeBytes("dhruba: " + f);
    out.close();
    assertTrue(fs.exists(f));
    return f;
  }
  
  static Path writeFileContents(FileSystem fs, Path f, String data)
    throws IOException {
    return writeFileContents(fs, f, data, 0L);
  }
  
  static Path writeFileContents(FileSystem fs, Path f, String data,
      long offset)
    throws IOException {
    DataOutputStream out = fs.create(f);
    if (offset > 0) {
      // write some trash
      byte[] trash = new byte[(int)offset];
      out.write(trash);
    }
    out.writeUTF(data);
    out.close();
    assertTrue(fs.exists(f));
    return f;
  }

  static String readFileContents(FileSystem fs, Path f)
    throws IOException {
    return fs.open(f).readUTF();
  }

  static Path mkdir(FileSystem fs, Path p) throws IOException {
    assertTrue(fs.mkdirs(p));
    assertTrue(fs.exists(p));
    assertTrue(fs.getFileStatus(p).isDir());
    return p;
  }

  static File createLocalFile(File f) throws IOException {
    assertTrue(!f.exists());
    PrintWriter out = new PrintWriter(f);
    out.print("createLocalFile: " + f.getAbsolutePath());
    out.flush();
    out.close();
    assertTrue(f.exists());
    assertTrue(f.isFile());
    return f;
  }
  
  static void show(String s) {
    System.out.println(Thread.currentThread().getStackTrace()[2] + " " + s);
  }
  
  public void testCopyToLocalWithStartingOffset() throws Exception {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    FileSystem localFs = FileSystem.getLocal(conf);
    FsShell shell = new FsShell();
    shell.setConf(conf);
    String good = "good content";
    try {
      Path directory = new Path("/dir");
      Path srcFile = new Path("/dir/file");
      Path destFile = new Path(TEST_ROOT_DIR, "file");
      assertTrue(fs.mkdirs(directory));
      assertTrue(fs.exists(directory));
      
      for (int offset : new int[]{0, 1}) {
        // clear files
        fs.delete(srcFile, true);
        localFs.delete(destFile, true);
        writeFileContents(fs, srcFile, good, offset);
        String[] args = {"-copyToLocal",
            "-start", Integer.toString(offset),
            srcFile.toUri().getPath(),
            TEST_ROOT_DIR};
        assertEquals(0, shell.run(args));
        assertTrue(localFs.exists(destFile));
        assertEquals("We should get " + good, good,
            readFileContents(localFs, destFile));
        if (offset > 0) {
          show("Test normal read");
          localFs.delete(destFile, true);
          args = new String[]{"-copyToLocal",
              srcFile.toUri().getPath(),
              TEST_ROOT_DIR};
          assertEquals(0, shell.run(args));
          assertTrue(localFs.exists(destFile));
          assertNotSame("We should not get " + good, good,
              readFileContents(localFs, destFile));
          show("Test negative offset read");
          localFs.delete(destFile, true);
          args = new String[]{"-copyToLocal",
              "-start",
              Long.toString(offset - fs.getFileStatus(srcFile).getLen()),
              srcFile.toUri().getPath(),
              TEST_ROOT_DIR};
          assertEquals(0, shell.run(args));
          assertTrue(localFs.exists(destFile));
          assertEquals("We should get " + good, good,
              readFileContents(localFs, destFile));
        } 
      }
    } finally {
      try {
        fs.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }

  public void testZeroSizeFile() throws IOException {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: "+fs.getUri(),
               fs instanceof DistributedFileSystem);
    final DistributedFileSystem dfs = (DistributedFileSystem)fs;

    try {
      //create a zero size file
      final File f1 = new File(TEST_ROOT_DIR, "f1");
      assertTrue(!f1.exists());
      assertTrue(f1.createNewFile());
      assertTrue(f1.exists());
      assertTrue(f1.isFile());
      assertEquals(0L, f1.length());

      //copy to remote
      final Path root = mkdir(dfs, new Path("/test/zeroSizeFile"));
      final Path remotef = new Path(root, "dst");
      show("copy local " + f1 + " to remote " + remotef);
      dfs.copyFromLocalFile(false, false, false, new Path(f1.getPath()), remotef);

      //getBlockSize() should not throw exception
      show("Block size = " + dfs.getFileStatus(remotef).getBlockSize());

      //copy back
      final File f2 = new File(TEST_ROOT_DIR, "f2");
      assertTrue(!f2.exists());
      dfs.copyToLocalFile(false, false, remotef, new Path(f2.getPath()));
      assertTrue(f2.exists());
      assertTrue(f2.isFile());
      assertEquals(0L, f2.length());

      f1.delete();
      f2.delete();
    } finally {
      try {dfs.close();} catch (Exception e) {}
      cluster.shutdown();
    }
  }

  public void testRmdir() throws IOException {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: " + fs.getUri(),
        fs instanceof DistributedFileSystem);
    try {
      Path directory = new Path("/dir");
      Path tempFile = new Path("/dir/file");

      assertTrue(fs.mkdirs(directory));
      assertTrue(fs.exists(directory));
      writeFile(fs, tempFile);
      assertTrue(fs.exists(tempFile));

      FsShell shell = new FsShell();
      String argv[] = new String[3];
      argv[0] = "-rmdir";      
      argv[1] = "/dir"; 

      int ret = -100;
      try {
        ret = shell.run(argv);
        assertTrue(ret == -1);

        argv[1] = "-ignore-fail-on-non-empty";
        argv[2] = "/dir";
        ret = shell.run(argv);
        assertTrue(ret == 0);
        assertTrue(fs.exists(directory));

        assertTrue(fs.delete(tempFile, true));
        assertFalse(fs.exists(tempFile));

        argv[1] = "/dir";
        argv[2] = "";
        ret = shell.run(argv);
        assertTrue(ret == 0);
        assertFalse(fs.exists(directory));
      } catch (Exception e) {
        System.err.println("Exception raised from DFSShell.run " +
                            e.getLocalizedMessage());
      }
    } finally {
      try { fs.close(); } catch (IOException e) { };
      cluster.shutdown();
    }
  }

  public void testRecrusiveRm() throws IOException {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: " + fs.getUri(), 
        fs instanceof DistributedFileSystem);
    try {
      fs.mkdirs(new Path(new Path("parent"), "child"));
      try {
        fs.delete(new Path("parent"), false);
        assert(false); // should never reach here.
      } catch(IOException e) {
         //should have thrown an exception
      }
      try {
        fs.delete(new Path("parent"), true);
      } catch(IOException e) {
        assert(false);
      }
    } finally {
      try { fs.close();}catch(IOException e){};
      cluster.shutdown();
    }
  }

  public void testHeadTail() throws Exception {
    Configuration conf = new Configuration();
    cluster = null;
    PrintStream psBackup = null;
    try {
      cluster = new MiniDFSCluster(conf, 2, true, null);
      FileSystem fs = cluster.getFileSystem();
      psBackup = System.out;
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      PrintStream outPs = new PrintStream(out);
      System.setOut(outPs);

      // create a large file and a small file
      String fileNameLarge = "/head-tail-large";
      String fileNameSmall = "/head-tail-small";
      final Path pathLarge = new Path(fileNameLarge);
      final Path pathSmall = new Path(fileNameSmall);
      int sectLen = 32;
      final byte a[] = new byte[sectLen];
      final byte b[] = new byte[sectLen];
      Arrays.fill(a, (byte)'A');
      Arrays.fill(b, (byte)'B');
      OutputStream filetmp = fs.create(pathSmall);
      filetmp.write(a);
      filetmp.write(b);
      filetmp.close();
      filetmp = fs.create(pathLarge);
      for (int i = 0; i < 32; i++) {
        filetmp.write(a);
      }
      for (int i = 0; i < 32; i++) {
        filetmp.write(b);
      }
      filetmp.close();

      FsShell shell = new FsShell();
      String argv[]= new String[2];
      shell.setConf(conf);

      // head of large file, only contains AA
      argv[0] = "-head";
      argv[1] = fileNameLarge;
      ToolRunner.run(shell, argv);
      String outString = out.toString();
      out.reset();
      assertTrue(outString.contains("AA"));
      assertFalse(outString.contains("BB"));

      // head of smaill file, contains AA and BB
      argv[1] = fileNameSmall;
      ToolRunner.run(shell, argv);
      outString = out.toString();
      out.reset();
      assertTrue(outString.contains("AA"));
      assertTrue(outString.contains("BB"));

      // tail of large file, only contains BB
      argv[0] = "-tail";
      argv[1] = fileNameLarge;
      ToolRunner.run(shell, argv);
      outString = out.toString();
      out.reset();
      assertFalse(outString.contains("AA"));
      assertTrue(outString.contains("BB"));

      // tail of smaill file, contains AA and BB
      argv[1] = fileNameSmall;
      ToolRunner.run(shell, argv);
      outString = out.toString();
      out.reset();
      assertTrue(outString.contains("AA"));
      assertTrue(outString.contains("BB"));

    } finally {
      if (psBackup != null) {
        System.setOut(psBackup);
      }
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }

  public void testDu() throws IOException {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: "+fs.getUri(),
                fs instanceof DistributedFileSystem);
    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
    PrintStream psBackup = System.out;
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    PrintStream psOut = new PrintStream(out);
    System.setOut(psOut);
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      Path myPath = new Path("/test/dir");
      assertTrue(fs.mkdirs(myPath));
      assertTrue(fs.exists(myPath));
      Path myFile = new Path("/test/dir/file");
      writeFile(fs, myFile);
      assertTrue(fs.exists(myFile));
      Path myFile2 = new Path("/test/dir/file2");
      writeFile(fs, myFile2);
      assertTrue(fs.exists(myFile2));

      String[] args = new String[2];
      args[0] = "-du";
      args[1] = "/test/dir";
      int val = -1;
      try {
        val = shell.run(args);
      } catch (Exception e) {
        System.err.println("Exception raised from DFSShell.run " +
                            e.getLocalizedMessage());
      }
      assertTrue(val == 0);
      String returnString = out.toString();
      out.reset();
      // Check if size matchs as expected
      assertTrue(returnString.contains("22"));
      assertTrue(returnString.contains("23"));

    } finally {
      try {dfs.close();} catch (Exception e) {}
      System.setOut(psBackup);
      cluster.shutdown();
    }

  }

  public void testPut() throws Exception {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: "+fs.getUri(),
               fs instanceof DistributedFileSystem);
    final DistributedFileSystem dfs = (DistributedFileSystem)fs;

    try {
      // remove left over crc files:
      new File(TEST_ROOT_DIR, ".f1.crc").delete();
      new File(TEST_ROOT_DIR, ".f2.crc").delete();
      final File f1 = createLocalFile(new File(TEST_ROOT_DIR, "f1"));
      final File f2 = createLocalFile(new File(TEST_ROOT_DIR, "f2"));

      final Path root = mkdir(dfs, new Path("/test/put"));
      final Path dst = new Path(root, "dst");

      show("begin");

      final Thread copy2ndFileThread = new Thread() {
        public void run() {
          try {
            show("copy local " + f2 + " to remote " + dst);
            dfs.copyFromLocalFile(false, false, false,
                new Path(f2.getPath()), dst);
          } catch (IOException ioe) {
            show("good " + StringUtils.stringifyException(ioe));
            return;
          }
          //should not be here, must got IOException
          assertTrue(false);
        }
      };

      //use SecurityManager to pause the copying of f1 and begin copying f2
      SecurityManager sm = System.getSecurityManager();
      System.out.println("SecurityManager = " + sm);
      System.setSecurityManager(new SecurityManager() {
        private boolean firstTime = true;

        public void checkPermission(Permission perm) {
          if (firstTime) {
            Thread t = Thread.currentThread();
            if (!t.toString().contains("DataNode")) {
              String s = "" + Arrays.asList(t.getStackTrace());
              if (s.contains("FileUtil.copyContent")) {
                //pause at FileUtil.copyContent

                firstTime = false;
                copy2ndFileThread.start();
                try {Thread.sleep(5000);} catch (InterruptedException e) {}
              }
            }
          }
        }
      });
      show("copy local " + f1 + " to remote " + dst);
      dfs.copyFromLocalFile(false, false, false, new Path(f1.getPath()), dst);
      show("done");

      try {copy2ndFileThread.join();} catch (InterruptedException e) { }
      System.setSecurityManager(sm);

      // copy multiple files to destination directory
      final Path destmultiple = mkdir(dfs, new Path("/test/putmultiple"));
      Path[] srcs = new Path[2];
      srcs[0] = new Path(f1.getPath());
      srcs[1] = new Path(f2.getPath());
      dfs.copyFromLocalFile(false, false, false, srcs, destmultiple);
      srcs[0] = new Path(destmultiple,"f1");
      srcs[1] = new Path(destmultiple,"f2");
      assertTrue(dfs.exists(srcs[0]));
      assertTrue(dfs.exists(srcs[1]));

      // move multiple files to destination directory
      final Path destmultiple2 = mkdir(dfs, new Path("/test/movemultiple"));
      srcs[0] = new Path(f1.getPath());
      srcs[1] = new Path(f2.getPath());
      dfs.moveFromLocalFile(srcs, destmultiple2);
      assertFalse(f1.exists());
      assertFalse(f2.exists());
      srcs[0] = new Path(destmultiple2, "f1");
      srcs[1] = new Path(destmultiple2, "f2");
      assertTrue(dfs.exists(srcs[0]));
      assertTrue(dfs.exists(srcs[1]));

      f1.delete();
      f2.delete();
      
      // Verify that validation option works
      FsShell shell = new FsShell();
      shell.setConf(conf);
      File src = new File(TEST_ROOT_DIR, "f1");
      Path dstFile = new Path(root, "f1");
      try {
        src = createLocalFile(src);
        String[] args = {"-put", "-validate", src.toString(), dstFile.toString()};
        assertEquals(0, shell.run(args));
        dfs.delete(dstFile, true);

        // run it again with injected error
        InjectionHandler.set(new TestHandler());
        assertEquals(-1, shell.run(args));
      } finally {
        InjectionHandler.clear();
        dfs.delete(dstFile, true);
        src.delete();
      }

      // Verify that rate option works in put and get
      // [-put [-validate] [-rate <bandwidth>] <localsrc> ... <dst>]
      // [-get [-rate <bandwidth>] [-crc] <src> <localdst>]
      FileSystem localFs = FileSystem.getLocal(conf);
      Path srcFile = new Path("/tmp/srcFile");
      Path destFile = new Path(TEST_ROOT_DIR, "destFile");
      localFs.delete(srcFile, true);
      fs.delete(destFile, true);
      
      Random random = new Random(1);
      final long seed = random.nextLong();
      random.setSeed(seed);
      
      // generate random data
      final byte[] data = new byte[1024 * 10];
      random.nextBytes(data);
      String str = new String(data);
      writeFileContents(localFs, srcFile, str);

      try {
        // [-put [-validate] [-rate <bandwidth>] <localsrc> ... <dst>]
        // file put size is 10 times of rate
        // throttler will guarantee the put bandwidth <= expectedPutBandwidth
        long expectedPutBandwidth = 1024L;
        long start = System.currentTimeMillis();
        String[] putArgs =
        {"-put", "-rate", "1024", srcFile.toString(), destFile.toString()};
        assertEquals(0, shell.run(putArgs));
        long end = System.currentTimeMillis();
        long actualPutBandwidth =
          fs.getFileStatus(destFile).getLen() * 1000 / (end - start);
        assertTrue(actualPutBandwidth <= expectedPutBandwidth);

        // [-get [-rate <bandwidth>] [-crc] <src> <localdst>]
        // file get size is 10 times of rate
        // throttler will guarantee the get bandwidth <= expectedGetBandwidth
        long expectedGetBandwidth = 2048L;
        start = System.currentTimeMillis();
        String[] getArgs =
        {"-get", "-rate", "2048", destFile.toString(), TEST_ROOT_DIR};
        assertEquals(0, shell.run(getArgs));
        end = System.currentTimeMillis();
        File localFile = new File(TEST_ROOT_DIR, "destFile");
        long actualGetBandwidth =
          localFile.length() * 1000 / (end - start);
        assertTrue(actualGetBandwidth <= expectedGetBandwidth);

        localFile.delete();
      } finally {
        fs.delete(destFile, true);
        localFs.delete(srcFile, true);
      }

    } finally {
      try {dfs.close();} catch (Exception e) {}
      cluster.shutdown();
    }
  }

  /** test undelete */
  public void testUndelete() throws Exception {

    Configuration conf = new Configuration();
    conf.set("fs.trash.interval", "1"); // enable trash interval

    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: " + fs.getUri(),
               fs instanceof DistributedFileSystem);
    try {

      // test undelete of file that was deleted via shell
      {
        Path path = new Path("test_file_1");
        writeFile(fs, path);
        assertTrue(fs.exists(path));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];

        argv[0] = "-rm";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(!fs.exists(path));

        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));
      }

      // test undelete of file that was deleted programatically
      {
        Path path = new Path("test_file_2");
        writeFile(fs, path);
        assertTrue(fs.exists(path));

        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];
        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));
      }

      // test undelete of directory that was deleted via shell
      {
        Path path = new Path("test_dir_1");
        assertTrue(fs.mkdirs(path));
        assertTrue(fs.exists(path));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];

        argv[0] = "-rmr";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(!fs.exists(path));

        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));
      }

      // test undelete of directory that was deleted programatically
      {
        Path path = new Path("test_dir_2");
        assertTrue(fs.mkdirs(path));
        assertTrue(fs.exists(path));

        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];
        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));
      }

      // test undelete of most recently deleted version
      {
        Path path = new Path("test_file_multiversion");

        writeFileContents(fs, path, "wrong version");
        assertTrue(fs.exists(path));
        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        writeFileContents(fs, path, "right version");
        assertTrue(fs.exists(path));
        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];
        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));

        String verify = readFileContents(fs, path);
        System.err.println("verify=" + verify);
        assertTrue(verify.equals("right version"));
      }

      // test undelete of file from another user's trash
      {
        // make a fake trash for a different user
        Path path = new Path("my_file_in_joes_trash");
        Path joesTrashDir = new Path(
          "/user/joe/.Trash/Current/user/" + getUserName(fs));
        Path joesTrashFile = new Path(joesTrashDir, path);
        mkdir(fs, joesTrashDir);
        writeFileContents(fs, joesTrashFile, "some file contents");
        assertTrue(fs.exists(joesTrashFile));

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[4];
        argv[0] = "-undelete";
        argv[1] = "-u";
        argv[2] = "joe";
        argv[3] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));
      }

      // test undelete of most recently deleted version across
      // checkpoint intervals
      {
        Path path = new Path("test_file_interval");

        writeFileContents(fs, path, "wrong version");
        assertTrue(fs.exists(path));
        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        writeFileContents(fs, path, "right version");
        assertTrue(fs.exists(path));
        assertTrue(fs.delete(path, true));
        assertTrue(!fs.exists(path));

        // wait for the next interval before checking
        Thread.sleep(60500);

        FsShell shell = new FsShell();
        shell.setConf(conf);
        String argv[] = new String[2];
        argv[0] = "-undelete";
        argv[1] = path.toString();
        assertTrue(shell.run(argv) == 0);
        assertTrue(fs.exists(path));

        String verify = readFileContents(fs, path);
        System.err.println("verify=" + verify);
        assertTrue(verify.equals("right version"));
      }

    } finally {
      try { fs.close(); } catch (IOException e) { };
      cluster.shutdown();
    }
  }


  /** check command error outputs and exit statuses. */
  public void testErrOutPut() throws Exception {
    Configuration conf = new Configuration();
    cluster = null;
    PrintStream bak = null;
    try {
      cluster = new MiniDFSCluster(conf, 2, true, null);
      FileSystem srcFs = cluster.getFileSystem();
      Path root = new Path("/nonexistentfile");
      bak = System.err;
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      PrintStream tmp = new PrintStream(out);
      System.setErr(tmp);
      String[] argv = new String[2];
      argv[0] = "-cat";
      argv[1] = root.toUri().getPath();
      int ret = ToolRunner.run(new FsShell(), argv);
      assertTrue(" -cat returned -1 ", 0>=ret);
      String returned = out.toString();
      assertTrue("cat does not print exceptions ",
          (returned.lastIndexOf("Exception") == -1));
      out.reset();
      argv[0] = "-rm";
      argv[1] = root.toString();
      FsShell shell = new FsShell();
      shell.setConf(conf);
      ret = ToolRunner.run(shell, argv);
      assertTrue(" -rm returned -1 ", 0>=ret);
      returned = out.toString();
      out.reset();
      assertTrue("rm prints reasonable error ",
          (returned.lastIndexOf("No such file or directory") != -1));
      argv[0] = "-rmr";
      argv[1] = root.toString();
      ret = ToolRunner.run(shell, argv);
      assertTrue(" -rmr returned -1", 0>=ret);
      returned = out.toString();
      assertTrue("rmr prints reasonable error ",
          (returned.lastIndexOf("No such file or directory") != -1));
      out.reset();
      argv[0] = "-du";
      argv[1] = "/nonexistentfile";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -du prints reasonable error ",
          (returned.lastIndexOf("No such file or directory") != -1));
      out.reset();
      argv[0] = "-dus";
      argv[1] = "/nonexistentfile";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -dus prints reasonable error",
          (returned.lastIndexOf("No such file or directory") != -1));
      out.reset();
      argv[0] = "-ls";
      argv[1] = "/nonexistenfile";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -ls does not return Found 0 items",
          (returned.lastIndexOf("Found 0") == -1));
      out.reset();
      argv[0] = "-ls";
      argv[1] = "/nonexistentfile";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" -lsr should fail ",
          (ret < 0));
      out.reset();
      srcFs.mkdirs(new Path("/testdir"));
      argv[0] = "-ls";
      argv[1] = "/testdir";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -ls does not print out anything ",
          (returned.lastIndexOf("Found 0") == -1));
      out.reset();
      argv[0] = "-ls";
      argv[1] = "/user/nonxistant/*";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" -ls on nonexistent glob returns -1",
          (ret < 0));
      out.reset();
      argv[0] = "-mkdir";
      argv[1] = "/testdir";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -mkdir returned -1 ", (ret < 0));
      assertTrue(" -mkdir returned File exists",
          (returned.lastIndexOf("File exists") != -1));
      Path testFile = new Path("/testfile");
      OutputStream outtmp = srcFs.create(testFile);
      outtmp.write(testFile.toString().getBytes());
      outtmp.close();
      out.reset();
      argv[0] = "-mkdir";
      argv[1] = "/testfile";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" -mkdir returned -1", (ret < 0));
      assertTrue(" -mkdir returned this is a file ",
          (returned.lastIndexOf("not a directory") != -1));
      out.reset();
      argv = new String[3];
      argv[0] = "-mv";
      argv[1] = "/testfile";
      argv[2] = "file";
      ret = ToolRunner.run(shell, argv);
      assertTrue("mv failed to rename", ret == -1);
      out.reset();
      argv = new String[3];
      argv[0] = "-mv";
      argv[1] = "/testfile";
      argv[2] = "/testfiletest";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue("no output from rename",
          (returned.lastIndexOf("Renamed") == -1));
      out.reset();
      argv[0] = "-mv";
      argv[1] = "/testfile";
      argv[2] = "/testfiletmp";
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" unix like output",
          (returned.lastIndexOf("No such file or") != -1));
      out.reset();
      argv = new String[1];
      argv[0] = "-du";
      srcFs.mkdirs(srcFs.getHomeDirectory());
      ret = ToolRunner.run(shell, argv);
      returned = out.toString();
      assertTrue(" no error ", (ret == 0));
      assertTrue("empty path specified",
          (returned.lastIndexOf("empty string") == -1));
    } finally {
      if (bak != null) {
        System.setErr(bak);
      }
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }


  public void testURIPaths() throws Exception {
    Configuration srcConf = new Configuration();
    Configuration dstConf = new Configuration();
    MiniDFSCluster srcCluster =  null;
    MiniDFSCluster dstCluster = null;
    String bak = "/tmp/sdong";
//    String bak = System.getProperty("test.build.data");
    try{
      srcCluster = new MiniDFSCluster(srcConf, 2, true, null);
      File nameDir = new File(new File(bak), "dfs_tmp_uri/");
      nameDir.mkdirs();
      System.setProperty("test.build.data", nameDir.toString());
      dstCluster = new MiniDFSCluster(dstConf, 2, true, null);
      FileSystem srcFs = srcCluster.getFileSystem();
      FileSystem dstFs = dstCluster.getFileSystem();
      FsShell shell = new FsShell();
      shell.setConf(srcConf);
      //check for ls
      String[] argv = new String[2];
      argv[0] = "-ls";
      argv[1] = dstFs.getUri().toString() + "/";
      int ret = ToolRunner.run(shell, argv);
      assertTrue("ls works on remote uri ", (ret==0));
      //check for rm -r
      dstFs.mkdirs(new Path("/hadoopdir"));
      argv = new String[2];
      argv[0] = "-rmr";
      argv[1] = dstFs.getUri().toString() + "/hadoopdir";
      ret = ToolRunner.run(shell, argv);
      assertTrue("-rmr works on remote uri " + argv[1], (ret==0));
      //check du
      argv[0] = "-du";
      argv[1] = dstFs.getUri().toString() + "/";
      ret = ToolRunner.run(shell, argv);
      assertTrue("du works on remote uri ", (ret ==0));
      //check put
      File furi = new File(TEST_ROOT_DIR, "furi");
      createLocalFile(furi);
      argv = new String[3];
      argv[0] = "-put";
      argv[1] = furi.toString();
      argv[2] = dstFs.getUri().toString() + "/furi";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" put is working ", (ret==0));
      //check cp
      argv[0] = "-cp";
      argv[1] = dstFs.getUri().toString() + "/furi";
      argv[2] = srcFs.getUri().toString() + "/furi";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" cp is working ", (ret==0));
      assertTrue(srcFs.exists(new Path("/furi")));
      //check cat
      argv = new String[2];
      argv[0] = "-cat";
      argv[1] = dstFs.getUri().toString() + "/furi";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" cat is working ", (ret == 0));
      //check chown
      dstFs.delete(new Path("/furi"), true);
      dstFs.delete(new Path("/hadoopdir"), true);
      String file = "/tmp/chownTest";
      Path path = new Path(file);
      Path parent = new Path("/tmp");
      Path root = new Path("/");
      TestDFSShell.writeFile(dstFs, path);
      FileStatus oldStatus = dstFs.getFileStatus(path);
      runCmd(shell, "-chgrp", "-R", "herbivores", dstFs.getUri().toString() +"/*");
      confirmOwner(null, "herbivores", oldStatus, dstFs, parent, path);
      oldStatus = dstFs.getFileStatus(root);
      runCmd(shell, "-chown", "-R", ":reptiles", dstFs.getUri().toString() + "/");
      confirmOwner(null, "reptiles", oldStatus, dstFs, root, parent, path);
      //check if default hdfs:/// works
      argv[0] = "-cat";
      argv[1] = "hdfs:///furi";
      ret = ToolRunner.run(shell, argv);
      assertTrue(" default works for cat", (ret == 0));
      argv[0] = "-ls";
      argv[1] = "hdfs:///";
      ret = ToolRunner.run(shell, argv);
      assertTrue("default works for ls ", (ret == 0));
      argv[0] = "-rmr";
      argv[1] = "hdfs:///furi";
      ret = ToolRunner.run(shell, argv);
      assertTrue("default works for rm/rmr", (ret ==0));
    } finally {
      System.setProperty("test.build.data", bak);
      if (null != srcCluster) {
        srcCluster.shutdown();
      }
      if (null != dstCluster) {
        dstCluster.shutdown();
      }
    }
  }

  public void testHardLink() throws Exception {
    Configuration conf = new Configuration();
    cluster = null;
    FileSystem fs = null;
    try {
      cluster = new MiniDFSCluster(conf, 2, true, null);
      fs = cluster.getFileSystem();
      String topDir = "/testHardLink";
      DFSTestUtil util = new DFSTestUtil(topDir, 10, 1, 1024);
      util.createFiles(fs, topDir);
      String[] fileNames = util.getFileNames(topDir);

      FsShell shell = new FsShell(conf);

      String dir = "/somedirectoryforhardlinktesting";
      fs.mkdirs(new Path(dir));


      String[] cmd = { "-hardlink", fileNames[0], fileNames[0] + "hardlink" };
      assertEquals(0, ToolRunner.run(shell, cmd));

      String[] cmd0 = { "-hardlink", fileNames[0], fileNames[0] + "hardlink1" };
      assertEquals(0, ToolRunner.run(shell, cmd0));

      String[] getFilesCmd = { "-showlinks", fileNames[0] };
      assertEquals(0, ToolRunner.run(shell, getFilesCmd));

      String[] getFilesCmd1 = { "-showlinks", "/nonexistentfile" };
      assertEquals(-1, ToolRunner.run(shell, getFilesCmd1));

      String[] getFilesCmd2 = { "-showlinks" };
      assertEquals(-1, ToolRunner.run(shell, getFilesCmd2));

      String[] getFilesCmd3 = { "-showlinks", dir };
      assertEquals(-1, ToolRunner.run(shell, getFilesCmd3));

      String[] getFilesCmd4 = { "-showlinks", fileNames[0], fileNames[1],
          fileNames[0] + "hardlink" };
      assertEquals(0, ToolRunner.run(shell, getFilesCmd4));

      FileStatusExtended stat1 = cluster.getNameNode().namesystem
          .getFileInfoExtended(fileNames[0]);
      FileStatusExtended stat2 = cluster.getNameNode().namesystem
          .getFileInfoExtended(fileNames[0] + "hardlink");
      assertTrue(Arrays.equals(stat1.getBlocks(), stat2.getBlocks()));
      assertEquals(stat1.getAccessTime(), stat2.getAccessTime());
      assertEquals(stat1.getBlockSize(), stat2.getBlockSize());
      assertEquals(stat1.getGroup(), stat2.getGroup());
      assertEquals(stat1.getLen(), stat2.getLen());
      assertEquals(stat1.getModificationTime(), stat2.getModificationTime());
      assertEquals(stat1.getOwner(), stat2.getOwner());
      assertEquals(stat1.getReplication(), stat2.getReplication());

      String[] cmd1 = { "-hardlink", fileNames[0], fileNames[1] };
      assertEquals(-1, ToolRunner.run(shell, cmd1));

      String[] cmd2 = { "-hardlink", fileNames[0], dir};
      assertEquals(-1, ToolRunner.run(shell, cmd2));

      String[] cmd3 = { "-hardlink", fileNames[0], null };
      assertEquals(-1, ToolRunner.run(shell, cmd3));

      String[] cmd4 = { "-hardlink", fileNames[0], fileNames[1], fileNames[2] };
      assertEquals(-1, ToolRunner.run(shell, cmd4));

      String[] cmd5 = { "-hardlink", fileNames[0] };
      assertEquals(-1, ToolRunner.run(shell, cmd5));
    } finally {
      if (cluster != null) {
        cluster.shutdown();
      }
      if (fs != null) {
        fs.close();
      }
    }

  }

  public void testText() throws Exception {
    Configuration conf = new Configuration();
    cluster = null;
    PrintStream bak = null;
    try {
      cluster = new MiniDFSCluster(conf, 2, true, null);
      FileSystem fs = cluster.getFileSystem();
      Path root = new Path("/texttest");
      fs.mkdirs(root);
      OutputStream zout = new GZIPOutputStream(
          fs.create(new Path(root, "file.gz")));
      Random r = new Random();
      ByteArrayOutputStream file = new ByteArrayOutputStream();
      for (int i = 0; i < 1024; ++i) {
        char c = Character.forDigit(r.nextInt(26) + 10, 36);
        file.write(c);
        zout.write(c);
      }
      zout.close();

      bak = System.out;
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      System.setOut(new PrintStream(out));

      String[] argv = new String[2];
      argv[0] = "-text";
      argv[1] = new Path(root, "file.gz").toUri().getPath();
      int ret = ToolRunner.run(new FsShell(), argv);
      assertTrue("-text returned -1", 0 >= ret);
      file.reset();
      out.reset();
      assertTrue("Output doesn't match input",
          Arrays.equals(file.toByteArray(), out.toByteArray()));

    } finally {
      if (null != bak) {
        System.setOut(bak);
      }
      if (null != cluster) {
        cluster.shutdown();
      }
    }
  }

  public void testCopyToLocal() throws Exception {
    Configuration conf = new Configuration();
    /* This tests some properties of ChecksumFileSystem as well.
     * Make sure that we create ChecksumDFS */
    conf.set("fs.hdfs.impl",
             "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: "+fs.getUri(),
               fs instanceof ChecksumDistributedFileSystem);
    ChecksumDistributedFileSystem dfs = (ChecksumDistributedFileSystem)fs;
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      String root = createTree(dfs, "copyToLocal");

      // Verify copying the tree
      {
        try {
          assertEquals(0,
              runCmd(shell, "-copyToLocal", root + "*", TEST_ROOT_DIR));
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }

        File localroot = new File(TEST_ROOT_DIR, "copyToLocal");
        File localroot2 = new File(TEST_ROOT_DIR, "copyToLocal2");

        File f1 = new File(localroot, "f1");
        assertTrue("Copying failed.", f1.isFile());

        File f2 = new File(localroot, "f2");
        assertTrue("Copying failed.", f2.isFile());

        File sub = new File(localroot, "sub");
        assertTrue("Copying failed.", sub.isDirectory());

        File f3 = new File(sub, "f3");
        assertTrue("Copying failed.", f3.isFile());

        File f4 = new File(sub, "f4");
        assertTrue("Copying failed.", f4.isFile());

        File f5 = new File(localroot2, "f1");
        assertTrue("Copying failed.", f5.isFile());

        f1.delete();
        f2.delete();
        f3.delete();
        f4.delete();
        f5.delete();
        sub.delete();
      }
      // Verify copying non existing sources do not create zero byte
      // destination files
      {
        String[] args = {"-copyToLocal", "nosuchfile", TEST_ROOT_DIR};
        try {
          assertEquals(-1, shell.run(args));
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                            e.getLocalizedMessage());
        }
        File f6 = new File(TEST_ROOT_DIR, "nosuchfile");
        assertTrue(!f6.exists());
      }
      
      // Verify that validation option works
      Path src = new Path(root, "f1");
      File f1 = new File(TEST_ROOT_DIR, "f1");
      try {
        writeFile(fs, src);
        String[] args = {"-copyToLocal", "-validate", src.toString(), TEST_ROOT_DIR};
        assertEquals(0, shell.run(args));
        f1.delete();

        // run it again with injected error
        InjectionHandler.set(new TestHandler());
        assertEquals(-1, shell.run(args));
      } finally {
        InjectionHandler.clear();
        fs.delete(src, true);
        f1.delete();
      }
    } finally {
      try {
        dfs.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }

  class TestHandler extends org.apache.hadoop.util.InjectionHandler {
    @Override
    public void _processEventIO(InjectionEventI event, Object... args) 
    throws IOException {
      if (event != InjectionEventCore.FILE_TRUNCATION) {
        return;
      }
      // arg0 is the file system and arg1 is the file to be truncated
      assertEquals(2, args.length);
      if (args[0] instanceof LocalFileSystem || 
          args[0] instanceof RawLocalFileSystem ) {
        // truncate the local file
        RandomAccessFile raFile = new RandomAccessFile((File)args[1], "rw");
        try {
          FileChannel channel = raFile.getChannel();
          long filesize = channel.size();
          assertFalse(0 == filesize);
          channel.truncate(filesize-1);
        } finally {
          raFile.close();
        }
      } else { // HDFS: decrement last block size
        assertTrue(args[0] instanceof DistributedFileSystem);
        String file = ((Path) args[1]).toUri().getPath();
        FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
        
        INode [] trgINodes =  namesystem.dir.getExistingPathINodes(file);
        INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
        int numBlocks = trgInode.getBlocks().length;
        Block lastBlock = trgInode.getBlocks()[numBlocks-1];
        lastBlock.setNumBytes(lastBlock.getNumBytes()-1);
      }
    }
  }

  static String createTree(FileSystem fs, String name) throws IOException {
    // create a tree
    //   ROOT
    //   |- f1
    //   |- f2
    //   + sub
    //      |- f3
    //      |- f4
    //   ROOT2
    //   |- f1
    String path = "/test/" + name;
    Path root = mkdir(fs, new Path(path));
    Path sub = mkdir(fs, new Path(root, "sub"));
    Path root2 = mkdir(fs, new Path(path + "2"));

    writeFile(fs, new Path(root, "f1"));
    writeFile(fs, new Path(root, "f2"));
    writeFile(fs, new Path(sub, "f3"));
    writeFile(fs, new Path(sub, "f4"));
    writeFile(fs, new Path(root2, "f1"));
    mkdir(fs, new Path(root2, "sub"));
    return path;
  }

  public void testCount() throws Exception {
    Configuration conf = new Configuration();
    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
    DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      String root = createTree(dfs, "count");

      // Verify the counts
      runCount(root, 2, 4, conf);
      runCount(root + "2", 2, 1, conf);
      runCount(root + "2/f1", 0, 1, conf);
      runCount(root + "2/sub", 1, 0, conf);

      final FileSystem localfs = FileSystem.getLocal(conf);
      Path localpath = new Path(TEST_ROOT_DIR, "testcount");
      localpath = localpath.makeQualified(localfs);
      localfs.mkdirs(localpath);

      final String localstr = localpath.toString();
      System.out.println("localstr=" + localstr);
      runCount(localstr, 1, 0, conf);
      assertEquals(0, new Count(new String[]{root, localstr}, 0, conf).runAll());
    } finally {
      try {
        dfs.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }
  private void runCount(String path, long dirs, long files, Configuration conf
                       ) throws IOException {
    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
    PrintStream out = new PrintStream(bytes);
    PrintStream oldOut = System.out;
    System.setOut(out);
    Scanner in = null;
    String results = null;
    try {
      new Count(new String[]{path}, 0, conf).runAll();
      results = bytes.toString();
      in = new Scanner(results);
      assertEquals(dirs, in.nextLong());
      assertEquals(files, in.nextLong());
    } finally {
      if (in!=null) in.close();
      IOUtils.closeStream(out);
      System.setOut(oldOut);
      System.out.println("results:\n" + results);
    }
  }

  //throws IOException instead of Exception as shell.run() does.
  private int runCmd(FsShell shell, String... args) throws IOException {
    try {
      return shell.run(args);
    } catch (IOException e) {
      throw e;
    } catch (RuntimeException e) {
      throw e;
    } catch (Exception e) {
      throw new IOException(StringUtils.stringifyException(e));
    }
  }

  /**
   * Test chmod.
   */
  void testChmod(Configuration conf, FileSystem fs, String chmodDir)
    throws IOException {
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      //first make dir
      Path dir = new Path(chmodDir);
      fs.delete(dir, true);
      fs.mkdirs(dir);

      runCmd(shell, "-chmod", "u+rwx,g=rw,o-rwx", chmodDir);
      assertEquals("rwxrw----",
                   fs.getFileStatus(dir).getPermission().toString());

      //create an empty file
      Path file = new Path(chmodDir, "file");
      TestDFSShell.writeFile(fs, file);

      //test octal mode
      runCmd(shell, "-chmod", "644", file.toString());
      assertEquals("rw-r--r--",
                   fs.getFileStatus(file).getPermission().toString());

      //test recursive
      runCmd(shell, "-chmod", "-R", "a+rwX", chmodDir);
      assertEquals("rwxrwxrwx",
                   fs.getFileStatus(dir).getPermission().toString());
      assertEquals("rw-rw-rw-",
                   fs.getFileStatus(file).getPermission().toString());

      fs.delete(dir, true);
    } finally {
      try {
        fs.close();
        shell.close();
      } catch (IOException ignored) {}
    }
  }

  private void confirmOwner(String owner, String group, FileStatus oldStatus,
                            FileSystem fs, Path... paths) throws IOException {
    for(Path path : paths) {
      FileStatus newStatus= fs.getFileStatus(path);
      if (owner != null) {
        assertEquals(owner, newStatus.getOwner());
      } else {
        assertEquals(oldStatus.getOwner(), newStatus.getOwner()); // should not change
      }
      if (group != null) {
        assertEquals(group, newStatus.getGroup());
      } else {
        assertEquals(oldStatus.getGroup(), newStatus.getGroup()); // should not change
      }
    }
  }

  public void testFilePermissions() throws IOException {
    Configuration conf = new Configuration();

    //test chmod on local fs
    FileSystem fs = FileSystem.getLocal(conf);
    testChmod(conf, fs,
              (new File(TEST_ROOT_DIR, "chmodTest")).getAbsolutePath());

    conf.set("dfs.permissions", "true");

    //test chmod on DFS
    cluster = new MiniDFSCluster(conf, 2, true, null);
    
    try {
      fs = cluster.getFileSystem();
      testChmod(conf, fs, "/tmp/chmodTest");

      // test chown and chgrp on DFS:

      FsShell shell = new FsShell();
      shell.setConf(conf);
      fs = cluster.getFileSystem();

      /*
       * For dfs, I am the super user and I can change ower of any file to
       * anything. "-R" option is already tested by chmod test above.
       */

      String file = "/tmp/chownTest";
      Path path = new Path(file);
      Path parent = new Path("/tmp");
      Path root = new Path("/");
      TestDFSShell.writeFile(fs, path);

      FileStatus oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chgrp", "-R", "herbivores", "/*", "unknownFile*");
      confirmOwner(null, "herbivores", oldStatus, fs, parent, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chgrp", "mammals", file);
      confirmOwner(null, "mammals", oldStatus, fs, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "-R", ":reptiles", "/");
      confirmOwner(null, "reptiles", oldStatus, fs, root, parent, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "python:", "/nonExistentFile", file);
      confirmOwner("python", "reptiles", oldStatus, fs, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "-R", "hadoop:toys", "unknownFile", "/");
      confirmOwner("hadoop", "toys", oldStatus, fs, root, parent, path);

      // Test different characters in names
      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "hdfs.user", file);
      confirmOwner("hdfs.user", null, oldStatus, fs, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "_Hdfs.User-10:_hadoop.users--", file);
      confirmOwner("_Hdfs.User-10", "_hadoop.users--", oldStatus, fs, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chown", "hdfs/hadoop-core@apache.org:asf-projects", file);
      confirmOwner("hdfs/hadoop-core@apache.org", "asf-projects", oldStatus, fs, path);

      oldStatus = fs.getFileStatus(path);
      runCmd(shell, "-chgrp", "hadoop-core@apache.org/100", file);
      confirmOwner(null, "hadoop-core@apache.org/100", oldStatus, fs, path);

    } finally {
      cluster.shutdown();
    }
  }

  // If target == null, perform fuzzy check for current time to prevent problems
  public void assertTimeCorrect(String msg, long time, Date target) {
    final long fuzzyMsThreshold = 10000; // 10 sec
    if (target == null) {
      long targetTime = (new Date()).getTime();
      assertTrue(msg, targetTime >= time &&
                 targetTime <= time + fuzzyMsThreshold);
    } else {
      assertTrue(msg, time == target.getTime());
    }
  }

  public void assertTimesCorrect(String msg, FileSystem fs, Path file,
                                 Date atime, Date mtime) throws IOException {
    FileStatus status = fs.getFileStatus(file);
    assertTimeCorrect(msg + ": atime is incorrect",
                      status.getAccessTime(), atime);
    assertTimeCorrect(msg + ": mtime is incorrect",
                      status.getModificationTime(), mtime);
  }

  public void testTouch() throws IOException, ParseException {
    Configuration conf = new Configuration();
    conf.set("dfs.access.time.precision", "100");
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: " + fs.getUri(),
               fs instanceof DistributedFileSystem);

    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

      // Test file creation
      Path file1 = new Path("/tmp/file1.txt");
      runCmd(shell, "-touch", "" + file1);
      assertTrue("Touch didn't create a file!", fs.exists(file1));
      assertTimesCorrect("Incorrect time for " + file1, fs, file1, null, null);

      // Verify that "-d" option works correctly
      String targetDate = "2001-02-03 04:05:06";
      Date d = df.parse(targetDate);
      // short format
      runCmd(shell, "-touch", "-d", targetDate, "" + file1);
      assertTimesCorrect("-touch -d didn't work", fs, file1, d, d);

      targetDate = "2002-02-02 02:02:02";
      d = df.parse(targetDate);
      // long format
      runCmd(shell, "-touch", "--date", targetDate, "" + file1);
      assertTimesCorrect("-touch --date didn't work", fs, file1, d, d);

      targetDate = "2003-03-03 03:03:03";
      d = df.parse(targetDate);
      // long format #2
      runCmd(shell, "-touch", "--date=" + targetDate, "" + file1);
      assertTimesCorrect("-touch --date didn't work", fs, file1, d, d);

      // Verify that touch sets current time by default
      runCmd(shell, "-touch", "" + file1);
      assertTimesCorrect("-touch didn't set current time", fs, file1, null, null);
      
      // Verify that "-c" works correctly
      Path file2 = new Path("/tmp/file2.txt");
      int exitCode = runCmd(shell, "-touch", "-c", "" + file2);
      assertTrue("-touch -c didn't return error", exitCode != 0);
      assertTrue("-touch -c created file", !fs.exists(file2));
      // Create file with stale atime&mtime
      targetDate = "1999-09-09 09:09:09";
      d = df.parse(targetDate);
      runCmd(shell, "-touch", "-d", targetDate, "" + file2);
      assertTimesCorrect("-touch -d didn't work", fs, file2, d, d);
      // Verify that "-touch -c" updated times correctly
      exitCode = runCmd(shell, "-touch", "-c", "" + file2);
      assertTrue("-touch -c failed on existing file", exitCode == 0);
      assertTimesCorrect("-touch -c didn't update file times", fs, file2, null, null);

      // Verify that "-a" and "-m" work correctly
      String date1 = "2001-01-01 01:01:01";
      String date2 = "2002-02-02 02:02:02";
      Date d1 = df.parse(date1);
      Date d2 = df.parse(date2);
      Date oldFile1Mtime = new Date(fs.getFileStatus(file1).getModificationTime());
      runCmd(shell, "-touch", "-a", "--date", date1, "" + file1);
      assertTimesCorrect("Option -a didn't work", fs, file1, d1, oldFile1Mtime);
      runCmd(shell, "-touch", "-m", "--date", date2, "" + file1);
      assertTimesCorrect("Option -m didn't work", fs, file1, d1, d2);
      Date oldFile2Atime = new Date(fs.getFileStatus(file2).getAccessTime());
      runCmd(shell, "-touch", "-m", "--date", date1, "" + file2);
      assertTimesCorrect("Option -m didn't work", fs, file2, oldFile2Atime, d1);
      runCmd(shell, "-touch", "-a", "--date", date2, "" + file2);
      assertTimesCorrect("Option -a didn't work", fs, file2, d2, d1);
      runCmd(shell, "-touch", "-au", Long.toString(d1.getTime()), "" + file2);
      assertTimesCorrect("Option -a and -u didn't work", fs, file2, d1, d1);
      runCmd(shell, "-touch", "-amu", Long.toString(d2.getTime()), "" + file2);
      assertTimesCorrect("Option -a, -m and -u didn't work", fs, file2, d2, d2);

      // Verify that touch "-m" could set directory time correctly
      // empty directory
      String date3 = "2003-03-03 03:03:03";
      String date4 = "2002-04-04 04:04:04";
      Date d3 = df.parse(date3);
      Date d4 = df.parse(date4);
      Path dir = new Path("/tmp/testDir");
      fs.mkdirs(dir);
      Date dirCreateTime = new Date(fs.getFileStatus(dir).getModificationTime());
      runCmd(shell, "-touch", "-a", "--date", date1, "" + dir);
      assertTimesCorrect("Option -a didn't work with empty directory", fs, dir, d1, dirCreateTime);
      runCmd(shell, "-touch", "-m", "--date", date2, "" + dir);
      assertTimesCorrect("Option -m didn't work with empty directory", fs, dir, d1, d2);

      // non-empty directory
      Path testFile = new Path("/tmp/testDir/testFile");
      runCmd(shell, "-touchz", "" + testFile);
      runCmd(shell, "-touch", "-a", "--date", date3, "" + dir);
      Date fileCreateTime = new Date(fs.getFileStatus(dir).getModificationTime());
      assertTimesCorrect("Option -a didn't work with non-empty directory", fs, dir, d3, fileCreateTime);
      runCmd(shell, "-touch", "-m", "--date", date4, "" + dir);
      assertTimesCorrect("Option -m didn't work with non-empty directory", fs, dir, d3, d4);

    } finally {
      try {
        fs.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }

  // Test touch on the default/non-default nameservice in a federated cluster
  public void testTouchFederation() throws IOException, ParseException {
    Configuration conf = new Configuration();
    int numNamenodes = 2;
    int numDatanodes = 2;
    cluster = new MiniDFSCluster(conf, numDatanodes, true, null, numNamenodes);
    cluster.waitActive();

    // f1, f2 are non-default nameservice
    FileSystem fs1 = cluster.getFileSystem(0);
    FileSystem fs2 = cluster.getFileSystem(1);
    // f3 is the default nameservice
    FileSystem fs3 = FileSystem.get(conf);
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      Path file1 = new Path(fs1.getUri() + "/tmp/federateFile1.txt");
      Path file2 = new Path(fs2.getUri() + "/tmp/federateFile2.txt");
      Path file3 = new Path("/tmp/federateFile3.txt");
      runCmd(shell, "-touch", "" + file1, "" + file2, "" + file3);
      assertTrue("Touch didn't create a file!", fs1.exists(file1));
      assertTrue("Touch didn't create a file!", fs2.exists(file2));
      assertTrue("Touch didn't create a file!", fs3.exists(file3));
    } finally {
      try {
        fs1.close();
        fs2.close();
        fs3.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }

  /**
   * Tests various options of DFSShell.
   */
  public void testDFSShell() throws IOException {
    Configuration conf = new Configuration();
    /* This tests some properties of ChecksumFileSystem as well.
     * Make sure that we create ChecksumDFS */
    conf.set("fs.hdfs.impl",
             "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
    cluster = new MiniDFSCluster(conf, 2, true, null);
    FileSystem fs = cluster.getFileSystem();
    assertTrue("Not a HDFS: "+fs.getUri(),
            fs instanceof ChecksumDistributedFileSystem);
    ChecksumDistributedFileSystem fileSys = (ChecksumDistributedFileSystem)fs;
    FsShell shell = new FsShell();
    shell.setConf(conf);

    try {
      // First create a new directory with mkdirs
      Path myPath = new Path("/test/mkdirs");
      assertTrue(fileSys.mkdirs(myPath));
      assertTrue(fileSys.exists(myPath));
      assertTrue(fileSys.mkdirs(myPath));

      // Second, create a file in that directory.
      Path myFile = new Path("/test/mkdirs/myFile");
      writeFile(fileSys, myFile);
      assertTrue(fileSys.exists(myFile));
      Path myFile2 = new Path("/test/mkdirs/myFile2");
      writeFile(fileSys, myFile2);
      assertTrue(fileSys.exists(myFile2));

      // Verify that rm with a pattern
      {
        String[] args = new String[2];
        args[0] = "-rm";
        args[1] = "/test/mkdirs/myFile*";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);
        assertFalse(fileSys.exists(myFile));
        assertFalse(fileSys.exists(myFile2));

        //re-create the files for other tests
        writeFile(fileSys, myFile);
        assertTrue(fileSys.exists(myFile));
        writeFile(fileSys, myFile2);
        assertTrue(fileSys.exists(myFile2));
      }

      // Verify that we can read the file
      {
        String[] args = new String[3];
        args[0] = "-cat";
        args[1] = "/test/mkdirs/myFile";
        args[2] = "/test/mkdirs/myFile2";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run: " +
                             StringUtils.stringifyException(e));
        }
        assertTrue(val == 0);
      }
      fileSys.delete(myFile2, true);

      // Verify that we can get with and without crc
      {
        File testFile = new File(TEST_ROOT_DIR, "mkdirs/myFile");
        File checksumFile = new File(fileSys.getChecksumFile(
                                                             new Path(testFile.getAbsolutePath())).toString());
        testFile.delete();
        checksumFile.delete();

        String[] args = new String[3];
        args[0] = "-get";
        args[1] = "/test/mkdirs";
        args[2] = TEST_ROOT_DIR;
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);
        assertTrue("Copying failed.", testFile.exists());
        assertTrue("Checksum file " + checksumFile+" is copied.", !checksumFile.exists());
        testFile.delete();
      }
      {
        File testFile = new File(TEST_ROOT_DIR, "mkdirs/myFile");
        File checksumFile = new File(fileSys.getChecksumFile(
                                                             new Path(testFile.getAbsolutePath())).toString());
        testFile.delete();
        checksumFile.delete();

        String[] args = new String[4];
        args[0] = "-get";
        args[1] = "-crc";
        args[2] = "/test/mkdirs";
        args[3] = TEST_ROOT_DIR;
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);

        assertTrue("Copying data file failed.", testFile.exists());
        assertTrue("Checksum file " + checksumFile+" not copied.", checksumFile.exists());
        testFile.delete();
        checksumFile.delete();
      }
      // Verify that we get an error while trying to read an nonexistent file
      {
        String[] args = new String[2];
        args[0] = "-cat";
        args[1] = "/test/mkdirs/myFile1";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val != 0);
      }

      // Verify that we get an error while trying to delete an nonexistent file
      {
        String[] args = new String[2];
        args[0] = "-rm";
        args[1] = "/test/mkdirs/myFile1";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val != 0);
      }

      // Verify that we succeed in removing the file we created
      {
        String[] args = new String[2];
        args[0] = "-rm";
        args[1] = "/test/mkdirs/myFile";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);
      }

      // Verify touch/test
      {
        String[] args = new String[2];
        args[0] = "-touchz";
        args[1] = "/test/mkdirs/noFileHere";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);

        args = new String[3];
        args[0] = "-test";
        args[1] = "-e";
        args[2] = "/test/mkdirs/noFileHere";
        val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);
      }

      // Verify that cp from a directory to a subdirectory fails
      {
        String[] args = new String[2];
        args[0] = "-mkdir";
        args[1] = "/test/dir1";
        int val = -1;
        try {
          val = shell.run(args);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);

        // this should fail
        String[] args1 = new String[3];
        args1[0] = "-cp";
        args1[1] = "/test/dir1";
        args1[2] = "/test/dir1/dir2";
        val = 0;
        try {
          val = shell.run(args1);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == -1);

        // this should succeed
        args1[0] = "-cp";
        args1[1] = "/test/dir1";
        args1[2] = "/test/dir1foo";
        val = -1;
        try {
          val = shell.run(args1);
        } catch (Exception e) {
          System.err.println("Exception raised from DFSShell.run " +
                             e.getLocalizedMessage());
        }
        assertTrue(val == 0);
      }

    } finally {
      try {
        fileSys.close();
      } catch (Exception e) {
      }
      cluster.shutdown();
    }
  }

  static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
    List<File> files = new ArrayList<File>();
    List<DataNode> datanodes = cluster.getDataNodes();
    int nsId = cluster.getNameNode().getNamespaceID();
    Block[][] blocks = cluster.getAllBlockReports(nsId);
    for(int i = 0; i < blocks.length; i++) {
      FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
      for(Block b : blocks[i]) {
        files.add(ds.getBlockFile(nsId,b));
      }        
    }
    return files;
  }

  static void corrupt(List<File> files) throws IOException {
    for(File f : files) {
      byte[] buffer = new byte[(int) f.length()];
      FileInputStream fis = new FileInputStream(f);
      fis.read(buffer, 0, buffer.length);
      fis.close();
      buffer[BlockInlineChecksumReader.getHeaderSize()]++;
      
      FileOutputStream fos = new FileOutputStream(f);
      fos.write(buffer, 0, buffer.length);
      fos.close();
    }
  }

  static interface TestGetRunner {
    String run(int exitcode, String... options) throws IOException;
  }

  public void testRemoteException() throws Exception {
    UnixUserGroupInformation tmpUGI = new UnixUserGroupInformation("tmpname",
        new String[] {
        "mygroup"});
    cluster = null;
    PrintStream bak = null;
    try {
      Configuration conf = new Configuration();
      cluster = new MiniDFSCluster(conf, 2, true, null);
      FileSystem fs = cluster.getFileSystem();
      Path p = new Path("/foo");
      fs.mkdirs(p);
      fs.setPermission(p, new FsPermission((short)0700));
      UnixUserGroupInformation.saveToConf(conf,
          UnixUserGroupInformation.UGI_PROPERTY_NAME, tmpUGI);
      FsShell fshell = new FsShell(conf);
      bak = System.err;
      ByteArrayOutputStream out = new ByteArrayOutputStream();
      PrintStream tmp = new PrintStream(out);
      System.setErr(tmp);
      String[] args = new String[2];
      args[0] = "-ls";
      args[1] = "/foo";
      int ret = ToolRunner.run(fshell, args);
      assertTrue("returned should be -1", (ret == -1));
      String str = out.toString();
      assertTrue("permission denied printed", str.indexOf("Permission denied") != -1);
      out.reset();
    } finally {
      if (bak != null) {
        System.setErr(bak);
      }
      if (cluster != null) {
        cluster.shutdown();
      }
    }
  }

  public void testGet() throws IOException {
    DFSTestUtil.setLogLevel2All(FSInputChecker.LOG);
    final Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();

    try {
      final String fname = "testGet.txt";
      final File localf = createLocalFile(new File(TEST_ROOT_DIR, fname));
      final String localfcontent = DFSTestUtil.readFile(localf);
      final Path root = mkdir(dfs, new Path("/test/get"));
      final Path remotef = new Path(root, fname);
      dfs.copyFromLocalFile(false, false, new Path(localf.getPath()), remotef);

      final FsShell shell = new FsShell();
      shell.setConf(conf);
      TestGetRunner runner = new TestGetRunner() {
        private int count = 0;

        public String run(int exitcode, String... options) throws IOException {
          String dst = TEST_ROOT_DIR + "/" + fname+ ++count;
          String[] args = new String[options.length + 3];
          args[0] = "-get";
          args[args.length - 2] = remotef.toString();
          args[args.length - 1] = dst;
          for(int i = 0; i < options.length; i++) {
            args[i + 1] = options[i];
          }
          show("args=" + Arrays.asList(args));

          try {
            assertEquals(exitcode, shell.run(args));
          } catch (Exception e) {
            assertTrue(StringUtils.stringifyException(e), false);
          }
          return exitcode == 0? DFSTestUtil.readFile(new File(dst)): null;
        }
      };

      assertEquals(localfcontent, runner.run(0));
      assertEquals(localfcontent, runner.run(0, "-ignoreCrc"));

      //find and modify the block files
      List<File> files = getBlockFiles(cluster);
      show("files=" + files);
      corrupt(files);

      assertEquals(null, runner.run(-1));
      String corruptedcontent = runner.run(0, "-ignoreCrc");
      assertEquals(localfcontent.substring(1), corruptedcontent.substring(1));
      assertEquals(localfcontent.charAt(0)+1, corruptedcontent.charAt(0));

      localf.delete();
    } finally {
      try {dfs.close();} catch (Exception e) {}
      cluster.shutdown();
    }
  }

  public void testLsr() throws Exception {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();

    try {
      final String root = createTree(dfs, "lsr");
      dfs.mkdirs(new Path(root, "zzz"));

      runLsCmd(new FsShell(conf), root, 0, "-lsr", null);

      final Path sub = new Path(root, "sub");
      dfs.setPermission(sub, new FsPermission((short)0));

      final UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
      final String tmpusername = ugi.getUserName() + "1";
      UnixUserGroupInformation tmpUGI = new UnixUserGroupInformation(
          tmpusername, new String[] {tmpusername});
      UnixUserGroupInformation.saveToConf(conf,
            UnixUserGroupInformation.UGI_PROPERTY_NAME, tmpUGI);
      String results = runLsCmd(new FsShell(conf), root, -1, "-lsr", null);
      assertTrue(results.contains("zzz"));
    } finally {
      cluster.shutdown();
    }
  }
  private static String runLsCmd(final FsShell shell, String root,
      int returnvalue, String cmd, String argument) throws Exception {
    System.out.println("root=" + root + ", returnvalue=" + returnvalue);
    final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
    final PrintStream out = new PrintStream(bytes);
    final PrintStream oldOut = System.out;
    final PrintStream oldErr = System.err;
    System.setOut(out);
    System.setErr(out);
    final String results;
    try {
      if (argument != null) {
        assertEquals(returnvalue, shell.run(new String[]{cmd, argument, root}));
      } else {
        assertEquals(returnvalue, shell.run(new String[]{cmd, root}));
      }
      results = bytes.toString();
    } finally {
      IOUtils.closeStream(out);
      System.setOut(oldOut);
      System.setErr(oldErr);
    }
    System.out.println("results:\n" + results);
    return results;
  }
  
  public void testLsd() throws Exception {
    Configuration conf = new Configuration();
    cluster = new MiniDFSCluster(conf, 2, true, null);
    DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();

    try {
      final String root = createTree(dfs, "lsd");
      dfs.mkdirs(new Path(root, "testDir"));
      String results = runLsCmd(new FsShell(conf), root + "/testDir", 0, "-ls", "-d");
      assertTrue(results.contains("testDir"));
    } finally {
      cluster.shutdown();
    }
  }
}
